{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "# WGAN-GP overriding `Model.train_step`\n",
    "\n",
    "**Author:** [A_K_Nain](https://twitter.com/A_K_Nain)<br>\n",
    "**Date created:** 2020/05/9<br>\n",
    "**Last modified:** 2020/05/9<br>\n",
    "**Description:** Implementation of Wasserstein GAN with Gradient Penalty."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Wasserstein GAN (WGAN) with Gradient Penalty (GP)\n",
    "\n",
    "The original [Wasserstein GAN](https://arxiv.org/abs/1701.07875) leverages\n",
    "the Wasserstein distance to produce a value function that has better theoretical\n",
    "properties than the value function used in the original GAN paper. WGAN requires that\n",
    "the discriminator (aka the critic) lie within the space of 1-Lipschitz functions.\n",
    "The authors proposed the idea of weight clipping to achieve this constraint. Though\n",
    "weight clipping works, it can be a problematic way to enforce 1-Lipschitz constraint\n",
    "and can cause undesirable behavior, e.g. a very deep WGAN discriminator (critic)\n",
    "often fails to converge.\n",
    "\n",
    "[WGAN-GP](https://arxiv.org/pdf/1704.00028.pdf) proposed an alternative to weight\n",
    "clipping to ensure smooth training. Instead of clipping the weights, the authors\n",
    "proposed a \"gradient penalty\": adding a loss term that keeps the L2\n",
    "norm of the discriminator gradients close to 1.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Setup\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import tensorflow as tf\n",
    "from tensorflow import keras\n",
    "from tensorflow.keras import layers\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Prepare Fashion-MNIST data\n",
    "\n",
    "We will be using the [Fashion-MNIST](https://github.com/zalandoresearch/fashion-mnist) dataset\n",
    "in this example to demonstrate the training of WGAN-GP. Each sample in this dataset is a 28x28\n",
    "grayscale image associated with a label from 10 classes (e.g. Trouser, Pullover, Sneaker, etc.)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "IMG_SHAPE = (28, 28, 1)\n",
    "BATCH_SIZE = 512\n",
    "\n",
    "# Size of noise vector\n",
    "noise_dim = 128\n",
    "\n",
    "fashion_mnist = keras.datasets.fashion_mnist\n",
    "(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()\n",
    "print(f\"Number of examples: {len(train_images)}\")\n",
    "print(f\"Shape of the images in the dataset: {train_images.shape[1:]}\")\n",
    "\n",
    "# we will reshape each sample to (28, 28, 1) and normalize the pixel values in [-1, 1].\n",
    "train_images = train_images.reshape(train_images.shape[0], *IMG_SHAPE).astype(\"float32\")\n",
    "train_images = (train_images - 127.5) / 127.5\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Create the discriminator (aka critic in the original WGAN)\n",
    "\n",
    "The samples in the dataset have shape `(28, 28, 1)`. As we will be\n",
    "using strided convolutions, this can result in a shape with odd dimensions.\n",
    "For example,\n",
    "`(28, 28) -> Conv_s2 -> (14, 14) -> Conv_s2 -> (7, 7) -> Conv_s2 ->(3, 3)`.\n",
    "\n",
    "While doing upsampling in the generator, we won't get the same input shape\n",
    "as the original images if we aren't careful. To avoid this, we will do\n",
    "something much simpler. In the discriminator, we will \"zero pad\" the input\n",
    "to make the shape `(32, 32, 1)` for each sample, while in the generator we will\n",
    "crop the final output to match the shape with input shape.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "\n",
    "def conv_block(\n",
    "    x,\n",
    "    filters,\n",
    "    activation,\n",
    "    kernel_size=(3, 3),\n",
    "    strides=(1, 1),\n",
    "    padding=\"same\",\n",
    "    use_bias=True,\n",
    "    use_bn=False,\n",
    "    use_dropout=False,\n",
    "    drop_value=0.5,\n",
    "):\n",
    "    x = layers.Conv2D(\n",
    "        filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias\n",
    "    )(x)\n",
    "    if use_bn:\n",
    "        x = layers.BatchNormalization()(x)\n",
    "    x = activation(x)\n",
    "    if use_dropout:\n",
    "        x = layers.Dropout(drop_value)(x)\n",
    "    return x\n",
    "\n",
    "\n",
    "def get_discriminator_model():\n",
    "    img_input = layers.Input(shape=IMG_SHAPE)\n",
    "    # Zero pad the input to make the input images size to (32, 32, 1).\n",
    "    x = layers.ZeroPadding2D((2, 2))(img_input)\n",
    "    x = conv_block(\n",
    "        x,\n",
    "        64,\n",
    "        kernel_size=(5, 5),\n",
    "        strides=(2, 2),\n",
    "        use_bn=False,\n",
    "        use_bias=True,\n",
    "        activation=layers.LeakyReLU(0.2),\n",
    "        use_dropout=False,\n",
    "        drop_value=0.3,\n",
    "    )\n",
    "    x = conv_block(\n",
    "        x,\n",
    "        128,\n",
    "        kernel_size=(5, 5),\n",
    "        strides=(2, 2),\n",
    "        use_bn=False,\n",
    "        activation=layers.LeakyReLU(0.2),\n",
    "        use_bias=True,\n",
    "        use_dropout=True,\n",
    "        drop_value=0.3,\n",
    "    )\n",
    "    x = conv_block(\n",
    "        x,\n",
    "        256,\n",
    "        kernel_size=(5, 5),\n",
    "        strides=(2, 2),\n",
    "        use_bn=False,\n",
    "        activation=layers.LeakyReLU(0.2),\n",
    "        use_bias=True,\n",
    "        use_dropout=True,\n",
    "        drop_value=0.3,\n",
    "    )\n",
    "    x = conv_block(\n",
    "        x,\n",
    "        512,\n",
    "        kernel_size=(5, 5),\n",
    "        strides=(2, 2),\n",
    "        use_bn=False,\n",
    "        activation=layers.LeakyReLU(0.2),\n",
    "        use_bias=True,\n",
    "        use_dropout=False,\n",
    "        drop_value=0.3,\n",
    "    )\n",
    "\n",
    "    x = layers.Flatten()(x)\n",
    "    x = layers.Dropout(0.2)(x)\n",
    "    x = layers.Dense(1)(x)\n",
    "\n",
    "    d_model = keras.models.Model(img_input, x, name=\"discriminator\")\n",
    "    return d_model\n",
    "\n",
    "\n",
    "d_model = get_discriminator_model()\n",
    "d_model.summary()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Create the generator\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "\n",
    "def upsample_block(\n",
    "    x,\n",
    "    filters,\n",
    "    activation,\n",
    "    kernel_size=(3, 3),\n",
    "    strides=(1, 1),\n",
    "    up_size=(2, 2),\n",
    "    padding=\"same\",\n",
    "    use_bn=False,\n",
    "    use_bias=True,\n",
    "    use_dropout=False,\n",
    "    drop_value=0.3,\n",
    "):\n",
    "    x = layers.UpSampling2D(up_size)(x)\n",
    "    x = layers.Conv2D(\n",
    "        filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias\n",
    "    )(x)\n",
    "\n",
    "    if use_bn:\n",
    "        x = layers.BatchNormalization()(x)\n",
    "\n",
    "    if activation:\n",
    "        x = activation(x)\n",
    "    if use_dropout:\n",
    "        x = layers.Dropout(drop_value)(x)\n",
    "    return x\n",
    "\n",
    "\n",
    "def get_generator_model():\n",
    "    noise = layers.Input(shape=(noise_dim,))\n",
    "    x = layers.Dense(4 * 4 * 256, use_bias=False)(noise)\n",
    "    x = layers.BatchNormalization()(x)\n",
    "    x = layers.LeakyReLU(0.2)(x)\n",
    "\n",
    "    x = layers.Reshape((4, 4, 256))(x)\n",
    "    x = upsample_block(\n",
    "        x,\n",
    "        128,\n",
    "        layers.LeakyReLU(0.2),\n",
    "        strides=(1, 1),\n",
    "        use_bias=False,\n",
    "        use_bn=True,\n",
    "        padding=\"same\",\n",
    "        use_dropout=False,\n",
    "    )\n",
    "    x = upsample_block(\n",
    "        x,\n",
    "        64,\n",
    "        layers.LeakyReLU(0.2),\n",
    "        strides=(1, 1),\n",
    "        use_bias=False,\n",
    "        use_bn=True,\n",
    "        padding=\"same\",\n",
    "        use_dropout=False,\n",
    "    )\n",
    "    x = upsample_block(\n",
    "        x, 1, layers.Activation(\"tanh\"), strides=(1, 1), use_bias=False, use_bn=True\n",
    "    )\n",
    "    # At this point, we have an output which has the same shape as the input, (32, 32, 1).\n",
    "    # We will use a Cropping2D layer to make it (28, 28, 1).\n",
    "    x = layers.Cropping2D((2, 2))(x)\n",
    "\n",
    "    g_model = keras.models.Model(noise, x, name=\"generator\")\n",
    "    return g_model\n",
    "\n",
    "\n",
    "g_model = get_generator_model()\n",
    "g_model.summary()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Create a WGAN-GP model\n",
    "\n",
    "Now that we have defined our generator and discriminator models, we will\n",
    "implement the WGAN-GP model. We will override the `train_step` for training.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "\n",
    "class WGAN(keras.Model):\n",
    "    def __init__(\n",
    "        self,\n",
    "        discriminator,\n",
    "        generator,\n",
    "        latent_dim,\n",
    "        discriminator_extra_steps=3,\n",
    "        gp_weight=10.0,\n",
    "    ):\n",
    "        super(WGAN, self).__init__()\n",
    "        self.discriminator = discriminator\n",
    "        self.generator = generator\n",
    "        self.latent_dim = latent_dim\n",
    "        self.d_steps = discriminator_extra_steps\n",
    "        self.gp_weight = gp_weight\n",
    "\n",
    "    def compile(self, d_optimizer, g_optimizer, d_loss_fn, g_loss_fn):\n",
    "        super(WGAN, self).compile()\n",
    "        self.d_optimizer = d_optimizer\n",
    "        self.g_optimizer = g_optimizer\n",
    "        self.d_loss_fn = d_loss_fn\n",
    "        self.g_loss_fn = g_loss_fn\n",
    "\n",
    "    def gradient_penalty(self, batch_size, real_images, fake_images):\n",
    "        \"\"\" Calculates the gradient penalty.\n",
    "\n",
    "        This loss is calculated on an interpolated image\n",
    "        and added to the discriminator loss.\n",
    "        \"\"\"\n",
    "        # get the interplated image\n",
    "        alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)\n",
    "        diff = fake_images - real_images\n",
    "        interpolated = real_images + alpha * diff\n",
    "\n",
    "        with tf.GradientTape() as gp_tape:\n",
    "            gp_tape.watch(interpolated)\n",
    "            # 1. Get the discriminator output for this interpolated image.\n",
    "            pred = self.discriminator(interpolated, training=True)\n",
    "\n",
    "        # 2. Calculate the gradients w.r.t to this interpolated image.\n",
    "        grads = gp_tape.gradient(pred, [interpolated])[0]\n",
    "        # 3. Calcuate the norm of the gradients\n",
    "        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))\n",
    "        gp = tf.reduce_mean((norm - 1.0) ** 2)\n",
    "        return gp\n",
    "\n",
    "    def train_step(self, real_images):\n",
    "        if isinstance(real_images, tuple):\n",
    "            real_images = real_images[0]\n",
    "\n",
    "        # Get the batch size\n",
    "        batch_size = tf.shape(real_images)[0]\n",
    "\n",
    "        # For each batch, we are going to perform the\n",
    "        # following steps as laid out in the original paper.\n",
    "        # 1. Train the generator and get the generator loss\n",
    "        # 2. Train the discriminator and get the discriminator loss\n",
    "        # 3. Calculate the gradient penalty\n",
    "        # 4. Multiply this gradient penalty with a constant weight factor\n",
    "        # 5. Add gradient penalty to the discriminator loss\n",
    "        # 6. Return generator and discriminator losses as a loss dictionary.\n",
    "\n",
    "        # Train discriminator first. The original paper recommends training\n",
    "        # the discriminator for `x` more steps (typically 5) as compared to\n",
    "        # one step of the generator. Here we will train it for 3 extra steps\n",
    "        # as compared to 5 to reduce the training time.\n",
    "        for i in range(self.d_steps):\n",
    "            # Get the latent vector\n",
    "            random_latent_vectors = tf.random.normal(\n",
    "                shape=(batch_size, self.latent_dim)\n",
    "            )\n",
    "            with tf.GradientTape() as tape:\n",
    "                # Generate fake images from the latent vector\n",
    "                fake_images = self.generator(random_latent_vectors, training=True)\n",
    "                # Get the logits for the fake images\n",
    "                fake_logits = self.discriminator(fake_images, training=True)\n",
    "                # Get the logits for real images\n",
    "                real_logits = self.discriminator(real_images, training=True)\n",
    "\n",
    "                # Calculate discriminator loss using fake and real logits\n",
    "                d_cost = self.d_loss_fn(real_img=real_logits, fake_img=fake_logits)\n",
    "                # Calculate the gradient penalty\n",
    "                gp = self.gradient_penalty(batch_size, real_images, fake_images)\n",
    "                # Add the gradient penalty to the original discriminator loss\n",
    "                d_loss = d_cost + gp * self.gp_weight\n",
    "\n",
    "            # Get the gradients w.r.t the discriminator loss\n",
    "            d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)\n",
    "            # Update the weights of the discriminator using the discriminator optimizer\n",
    "            self.d_optimizer.apply_gradients(\n",
    "                zip(d_gradient, self.discriminator.trainable_variables)\n",
    "            )\n",
    "\n",
    "        # Train the generator now.\n",
    "        # Get the latent vector\n",
    "        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))\n",
    "        with tf.GradientTape() as tape:\n",
    "            # Generate fake images using the generator\n",
    "            generated_images = self.generator(random_latent_vectors, training=True)\n",
    "            # Get the discriminator logits for fake images\n",
    "            gen_img_logits = self.discriminator(generated_images, training=True)\n",
    "            # Calculate the generator loss\n",
    "            g_loss = self.g_loss_fn(gen_img_logits)\n",
    "\n",
    "        # Get the gradients w.r.t the generator loss\n",
    "        gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables)\n",
    "        # Update the weights of the generator using the generator optimizer\n",
    "        self.g_optimizer.apply_gradients(\n",
    "            zip(gen_gradient, self.generator.trainable_variables)\n",
    "        )\n",
    "        return {\"d_loss\": d_loss, \"g_loss\": g_loss}\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Create a callback that periodically saves generated images\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "\n",
    "class GANMonitor(keras.callbacks.Callback):\n",
    "    def __init__(self, num_img=6, latent_dim=128):\n",
    "        self.num_img = num_img\n",
    "        self.latent_dim = latent_dim\n",
    "\n",
    "    def on_epoch_end(self, epoch, logs=None):\n",
    "        random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))\n",
    "        generated_images = self.model.generator(random_latent_vectors)\n",
    "        generated_images = (generated_images * 127.5) + 127.5\n",
    "\n",
    "        for i in range(self.num_img):\n",
    "            img = generated_images[i].numpy()\n",
    "            img = keras.preprocessing.image.array_to_img(img)\n",
    "            img.save(\"generated_img_{i}_{epoch}.png\".format(i=i, epoch=epoch))\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Train the end-to-end model\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "# Optimizer for both the networks\n",
    "# learning_rate=0.0002, beta_1=0.5 are recommened\n",
    "generator_optimizer = keras.optimizers.Adam(\n",
    "    learning_rate=0.0002, beta_1=0.5, beta_2=0.9\n",
    ")\n",
    "discriminator_optimizer = keras.optimizers.Adam(\n",
    "    learning_rate=0.0002, beta_1=0.5, beta_2=0.9\n",
    ")\n",
    "\n",
    "# Define the loss functions to be used for discrimiator\n",
    "# This should be (fake_loss - real_loss)\n",
    "# We will add the gradient penalty later to this loss function\n",
    "def discriminator_loss(real_img, fake_img):\n",
    "    real_loss = tf.reduce_mean(real_img)\n",
    "    fake_loss = tf.reduce_mean(fake_img)\n",
    "    return fake_loss - real_loss\n",
    "\n",
    "\n",
    "# Define the loss functions to be used for generator\n",
    "def generator_loss(fake_img):\n",
    "    return -tf.reduce_mean(fake_img)\n",
    "\n",
    "\n",
    "# Epochs to train\n",
    "epochs = 20\n",
    "\n",
    "# Callbacks\n",
    "cbk = GANMonitor(num_img=3, latent_dim=noise_dim)\n",
    "\n",
    "# Get the wgan model\n",
    "wgan = WGAN(\n",
    "    discriminator=d_model,\n",
    "    generator=g_model,\n",
    "    latent_dim=noise_dim,\n",
    "    discriminator_extra_steps=3,\n",
    ")\n",
    "\n",
    "# Compile the wgan model\n",
    "wgan.compile(\n",
    "    d_optimizer=discriminator_optimizer,\n",
    "    g_optimizer=generator_optimizer,\n",
    "    g_loss_fn=generator_loss,\n",
    "    d_loss_fn=discriminator_loss,\n",
    ")\n",
    "\n",
    "# Start training\n",
    "wgan.fit(train_images, batch_size=BATCH_SIZE, epochs=epochs, callbacks=[cbk])\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "Display the last generated images:\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "from IPython.display import Image, display\n",
    "\n",
    "display(Image(\"generated_img_0_19.png\"))\n",
    "display(Image(\"generated_img_1_19.png\"))\n",
    "display(Image(\"generated_img_2_19.png\"))\n"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [],
   "name": "wgan_gp",
   "private_outputs": false,
   "provenance": [],
   "toc_visible": true
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}